package com.ruyuan.eshop.promotion.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.concurrent.NamedDaemonThreadFactory;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.message.PlatformCouponMessage;
import com.ruyuan.eshop.promotion.domain.dto.SalesPromotionCouponItemDTO;
import com.ruyuan.eshop.promotion.service.CouponItemService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class PlatFormCouponListener implements MessageListenerConcurrently {

    /**
     * 优惠券服务service
     */
    @Autowired
    private CouponItemService couponItemService;

    // 测试completableFuture使用commonPool的时是不需要初始化业务ThreadPoolExecutor的
    // 这里用supplier懒加载让测试completableFuture使用commonPool时不要初始化线程池
    // 只有当使用completableFuture使用自定义的线程时才初始化线程池
    private static final int PERMITS = 30;
    private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
    private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;
    private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
        if (initializedRef.compareAndSet(false, true)) {
            THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
                    PERMITS, PERMITS * 2,
                    60,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1000),
                    NamedDaemonThreadFactory.getInstance("consumeCouponMsg"),
                    new ThreadPoolExecutor.CallerRunsPolicy());
        }
        return THREAD_POOL_EXECUTOR;
    };
    /**
     * 并发消费消息
     * @param msgList
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {

            // 方式一：使用默认的commonPool来处理任务
            // supplyAsync(Supplier<U> supplier) API
            // 默认使用的是 ForkJoinPool.commonPool() 这个线程池
            // 该线程池在jvm内是唯一的，默认的线程数量是cpu的核数减1
            // 如果觉得不线程数不够用可以通过jvm系统参数 java.util.concurrent.ForkJoinPool.common.parallelism 的值调整commonPool的并行度，或者采用方式二
            /*List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = msgList.stream()
                    .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e)))
                    .collect(Collectors.toList());*/

            // 方式二：使用自定的业务线程池来处理任务
            List<CompletableFuture<SalesPromotionCouponItemDTO>> futureList = msgList.stream()
                    .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
                    .collect(Collectors.toList());

            List<SalesPromotionCouponItemDTO> couponItemDTOList = futureList.stream()
                    .map(CompletableFuture::join)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());

            // 优惠券保存到数据库
            couponItemService.saveCouponBatch(couponItemDTOList);
        }catch (Exception e){
            log.error("consume error,平台优惠券消费失败", e);
            // 本次消费失败，下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    public SalesPromotionCouponItemDTO handleMessageExt(MessageExt messageExt) {
        log.debug("执行平台发放优惠券消费消息逻辑，消息内容：{}", messageExt.getBody());
        String msg = new String(messageExt.getBody());
        PlatformCouponMessage platformCouponMessage = JSON.parseObject(msg , PlatformCouponMessage.class);
        log.info("开始发放平台优惠券，couponId:{}", platformCouponMessage.getCouponId());

        //幂等逻辑防止重复消费
        JsonResult<Long> result = couponItemService.
                selectByAccountIdAndCouponId(platformCouponMessage.getUserAccountId(),
                        platformCouponMessage.getCouponId());
        // 如果已经存在，直接跳过循环，不再执行优惠券保存操作
        if(result.getSuccess()){
            return null;
        }

        SalesPromotionCouponItemDTO itemDTO = new SalesPromotionCouponItemDTO();
        itemDTO.setCouponId(platformCouponMessage.getCouponId());
        itemDTO.setCouponType(platformCouponMessage.getCouponType());
        itemDTO.setUserAccountId(platformCouponMessage.getUserAccountId());
        itemDTO.setIsUsed(0);
        itemDTO.setActivityStartTime(platformCouponMessage.getActivityStartTime());
        itemDTO.setActivityEndTime(platformCouponMessage.getActivityEndTime());
        return itemDTO;
    }

}
